/*
 * Copyright 2022 Huawei Cloud Computing Technology Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.huawei.cloudphone.datacenter;

import com.huawei.cloudphone.common.CASLog;
import com.huawei.cloudphone.jniwrapper.JNIWrapper;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CasRecvPktDispatcher {
    private static final int MAX_BUF_LEN = 1048576; // 1MB
    private static final String TAG = "CasRecvPktDispatcher";
    private static Map<Byte, NewPacketCallback> newPacketCallback = new ConcurrentHashMap<>();
    private volatile boolean stopFlag = false;
    private volatile boolean stopped = false;

    public void addNewPacketCallback(Byte tag, NewPacketCallback callback) {
        CASLog.e(TAG, "callback added " + tag);
        newPacketCallback.put(tag, callback);
    }

    public void deleteNewPacketCallback(Byte tag) {
        CASLog.e(TAG, "callback removed " + tag);
        newPacketCallback.remove(tag);
    }

    public void stopBlocked() {
        stopFlag = true;
        while (!stopped) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CASLog.i(TAG, "sleep interrupted");
            }
        }
    }

    public void start() {
        NewPacketCallback callback = newPacketCallback.get(JNIWrapper.AUDIO);
        if (callback != null) {
            new Thread(new ConsumerThread(JNIWrapper.AUDIO, callback))
                .start();
        }

        callback = newPacketCallback.get(JNIWrapper.RECORDER);
        if (callback != null) {
            new Thread(new ConsumerThread(JNIWrapper.RECORDER, callback))
                .start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                byte[] recvBuf = new byte[MAX_BUF_LEN];

                while (!stopFlag) {
                    for (Map.Entry<Byte, NewPacketCallback> entry : newPacketCallback.entrySet()) {
                        // AudioTrack and AudioRecord has some issue in Android 8.0, pick audio out.
                        if (entry.getKey().equals(JNIWrapper.AUDIO) || entry.getKey().equals(JNIWrapper.RECORDER)) {
                            continue;
                        }

                        int packetLen = JNIWrapper.recvData(entry.getKey(), recvBuf, recvBuf.length);
                        if (packetLen <= 0) {
                            continue;
                        }

                        byte[] copyData = new byte[packetLen];
                        System.arraycopy(recvBuf, 0, copyData, 0, packetLen);
                        entry.getValue().onNewPacket(copyData);
                    }

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        CASLog.i(TAG, "sleep interrupted, it's OK");
                    }
                }

                stopped = true;
            }
        }).start();
    }

    class ConsumerThread implements Runnable {
        NewPacketCallback mCallback;
        Byte mType;

        ConsumerThread(Byte datatype, NewPacketCallback callback) {
            mCallback = callback;
            mType = datatype;
        }

        @Override
        public void run() {
            byte[] recvBuf = new byte[MAX_BUF_LEN];

            while (!stopFlag) {
                int packetLen = JNIWrapper.recvData(mType, recvBuf, recvBuf.length);
                if (packetLen <= 0) {
                    try {
                        Thread.sleep(3);
                    } catch (InterruptedException e) {
                        CASLog.e(TAG, "sleep interrupted.");
                    }
                    continue;
                }

                byte[] copyData = new byte[packetLen];
                System.arraycopy(recvBuf, 0, copyData, 0, packetLen);
                mCallback.onNewPacket(copyData);
            }
        }
    }
}
